package atguigu.com.edu.func;

import atguigu.com.edu.common.GmallConfig;
import atguigu.com.edu.util.PhoenixUtil;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;


public class DimSinkFunction implements SinkFunction<JSONObject> {
    @Override
    public void invoke(JSONObject jsonObj, Context context) throws Exception {
        //{"tm_name":"atguigu","sink_table":"dim_base_trademark","id":12}
        //获取向phoenix中写入数据的表名
        String sinkTable = jsonObj.getString("sink_table");
        //从jsonObj中过滤掉sink_table
        jsonObj.remove("sink_table");

        //拼接upsert语句
        String upsertSql = "upsert into " + GmallConfig.PHOENIX_SCHEMA + "." + sinkTable
                + "(" + StringUtils.join(jsonObj.keySet(), ",") + ") " +
                " values('" + StringUtils.join(jsonObj.values(), "','") + "')";
        System.out.println("向phoenix表中插入数据的SQL:" + upsertSql);

        PhoenixUtil.executeSql(upsertSql);
    }
}
